-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[low-code CDK] Rsumable full refresh support for low-code streams #38300
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
""" | ||
return False | ||
|
||
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new interface method that we need in order to sanely read through state from the CheckpointReader. It isn't that useful for unnested streams, but this is critical for us to be able to sanely parse substream state as we iterate over parent record state in the CheckpointReader when we do the work
""" | ||
return True | ||
|
||
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the one field that really doesn't fit the existing cursor interface. We can default to false, and ultimately we don't even care about the record since we close the slice based on the page number.
self._current_slice: Optional[StreamSlice] = None | ||
self._finished_sync = False | ||
|
||
def next(self) -> Optional[Mapping[str, Any]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is a bit more complicated than it needs to be for streams because it can iterate in two dimensions: over incoming static set of slices and dynamically based on the current cursor's stream state.
Since this is only scoped to full refresh streams (not substreams nor incremental), there should only be one static slice to loop over). But we support it regardless and positions us better for substream low-code RFR when we prioritize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a fan of this! This is a pretty big change but the abstractions make it manageable. ✅
I left a few comments but it's mostly just request for a few more comments.
@@ -157,3 +159,8 @@ def state_checkpoint_interval(self) -> Optional[int]: | |||
important state is the one at the beginning of the slice | |||
""" | |||
return None | |||
|
|||
def get_cursor(self) -> Optional[Cursor]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more evidence the cursor belongs to the stream, not the retriever
elif hasattr(model.retriever, "paginator") and model.retriever.paginator and not stream_slicer: | ||
# To incrementally deliver RFR for low-code we're first implementing this for streams that do not use | ||
# nested state like substreams or those using list partition routers | ||
return ResumableFullRefreshCursor(parameters={}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Is there a follow up issue to support substreams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep I have it filed here when the original spec was written: https://github.com/airbytehq/airbyte-internal-issues/issues/7528
@@ -68,6 +71,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: | |||
self._last_record: Optional[Record] = None | |||
self._parameters = parameters | |||
self._name = InterpolatedString(self._name, parameters=parameters) if isinstance(self._name, str) else self._name | |||
self._synced_partitions: MutableMapping[Any, bool] = dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment clarifying what the True
means here? looking at a the code, it seems to mean that we started syncing a records from a partition.
It's also not obvious that this is only used for RFR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I made a comment down there but I'm getting stuck on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion, will add!
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to fetch more records | ||
# The platform deletes stream state for full refresh streams before starting a new job, so we don't need to worry about | ||
# this value existing for the initial attempt | ||
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking out loud: the fact that there are essentially two different code paths that are mutually exclusive for a given stream makes me think there's an opportunity to introduce a new abstraction where one implementation would be RFR and the other the standard read_records
.
I'd love to simplify this, but I'm not sure what the right abstraction is yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I would agree with that. Its kind of our way of not needing to re-implement read_records()
like we do for Python sources. It's not immediately clear at what level the abstraction needs to sit at yet. But I can't imagine pulling this out into its own abstraction later will be much of a lift.
self._current_slice: Optional[StreamSlice] = None | ||
self._finished_sync = False | ||
|
||
def next(self) -> Optional[Mapping[str, Any]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a docstring describing the algorithm in English?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
to the Cursor interface. | ||
""" | ||
|
||
def __init__(self, cursor: Cursor, stream_slices: Iterable[Optional[Mapping[str, Any]]], read_state_from_cursor: bool = False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment explaining when a user of this class how to configure read_state_from_cursor
?
self._current_slice = self._get_next_slice() | ||
return self._current_slice | ||
if self._read_state_from_cursor: | ||
state_for_slice = self._cursor.select_state(self._current_slice.get("partition")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it's preferable to call self._current_slice.partition
instead of get as slowly gets us away from thinking of the slice as an arbitrary dict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually i realize it should just be self._current_slice
since select_state()
takes in the slice itself, not the partition. But regardless this is fixed and we don't reference the arbitrary dict. Thanks!
Ran regression tests successfully against 3 sources/connections:
|
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/resumable_full_refresh_cursor.py
Show resolved
Hide resolved
.../python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Show resolved
Hide resolved
# Always return an empty generator just in case no records were ever yielded | ||
yield from [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non blocking, feels like this should be somehow baked into a base implementation of records_generator_fn
instead of being an extra check here 🤔
airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Show resolved
Hide resolved
""" | ||
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in | ||
a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of | ||
a specific parent delineated by the incoming slice's partition object. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helpful, thank you!
Right now only low-code connectors provide cursor implementations, but the logic is extensible to any stream that adheres | ||
to the Cursor interface. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true? Thinking about file-based and concurrent. Is this because the Abstract class for the declarative cursor is separate from the ones that concurrent and file-based use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say it is for the current type of Cursor we've promoted up. We're in a messy spot where we have a couple of different cursor interfaces though which is not great. But I will clarify this comment since you are right that it's confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
# Unlike RFR cursors that iterate dynamically based on how stream state is updated, most cursors operate on a | ||
# fixed set of slices determined before reading records. They should just iterate to the next slice | ||
self._current_slice = self._get_next_slice() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing a parallel between partition generators that generate partitions in advance and the fact that we were talking about creating partitions dynamically based on whether a next page cursor exists 🤔 just an observation
) | ||
assert exp == airbyte_stream | ||
assert airbyte_stream == exp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/7000 (nice issue id)
What
https://www.loom.com/share/97b0ee3b050448aba2cbfdd5b7d91e3d
This PR makes all low-code streams support resumable full refresh if they implement a paginator and are not already implemented as incremental syncs.
This approach should work with substreams since the new cursor adheres to the same interface as DatetimeBasedCursor which can be used by a per-partition cursor, but I've explicitly gated it off to reduce the scope.
How
At a high level the changes are:
ResumableFullRefreshCursor
which adheres to the Cursor interfaceA few notes on the design
The new CursorBasedCheckpointReader - Something that is really nice about this is that it is both low-code and Python stream agnostic. So long as a stream's implements the newly promoted Cursor concept, we can now avoid the checkpoint reader needing to scale w/ the complexities of state
I've tried to leverage the existing paginator logic as much as possible and avoid making the retriever and cursor have to re-implement behavior that already exists. However, it did require changes to allow reseting to a new value since there was no way to start pagination from anywhere but the start.
You'll also notice that the entire flow leverages the newly promoted
Cursor
andStreamSlice
types that were recently promoted in #38077 . This helps manage the complexity of the reader code when we can make clear assumptions on what is supplied to the source and passed back from the cursor.Review guide
resumable_full_refresh_cursor.py
cursor_pagination_strategy.py
page_increment.py
offset_increment.py
default_paginator.py
simple_retriever.py
checkpoint_reader.py
model_to_component_transformer.py
User Impact
A few potential breaking changes:
PaginationStrategy.reset()
now can take in an optional parameter. This would affect custom pagination strategies, but a look through our repo only shows one example onsource-guardian-api
which is minimally usedCursor.select_state()
has been added to the interface. This was originally unused by the PerPartitionCursor, but we will eventually need this for substream state. I wired most of it up, but custom components of Cursors/DatetimeBasedCursors need to implement this. It's unused by connectors right nowCan this PR be safely reverted and rolled back?